package com.atguigu.flink.sql.window;

import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * Created by Smexy on 2023/9/16
      TVF窗口支持grouping sets，在一个窗口中，支持使用grouping sets语法进行多维聚合分析。
 */


public class Demo4_TVFGroupingSetsWindow
{
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
                env.setParallelism(1);

                SingleOutputStreamOperator<WaterSensor> ds = env
                    .socketTextStream("hadoop102", 8888)
                    .map(new WaterSensorMapFunction())
                    ;

                Schema schema = Schema.newBuilder()
                                      .column("id", "STRING")
                                      .column("ts", "BIGINT")
                                      .column("vc", "INT")
                                      .columnByExpression("pt", "PROCTIME()")
                                      .columnByExpression("et", "TO_TIMESTAMP_LTZ(ts,3)")
                                      .watermark("et", "et - INTERVAL '0.001' SECOND")
                                      .build();


                tableEnv.createTemporaryView("t1",ds,schema);

        String tumbleSql = " SELECT window_start, window_end,id,ts, SUM(vc) sumVc " +
            "  FROM TABLE(" +
            "    TUMBLE(TABLE t1, DESCRIPTOR(et), INTERVAL '5' SECOND ))" +
            "  GROUP BY window_start, window_end ," +
            //"  grouping sets ( (id) ,(ts) ) "   //窗口内部再接上增强聚合
            //"   rollup ( id ,ts ) "
            "   cube ( id ,ts ) "
             ;


        tableEnv.sqlQuery(tumbleSql).execute().print();


        env.execute();

    }
}
